package kafka_pool

import "sync"

type IkPool interface {
	SetData(data []byte) error

	GetData() ([]byte, error)

	Subscribe() // 订阅数据
}

type Sub func(c *KPool, msg []byte)

type KPool struct {
	SubData chan []byte

	Data [][]byte

	mux sync.Mutex
	
	HaveSub bool
	
	MaxCache int // 最大缓存数据长度
}


// func: NewKPool(option ...Option) *KPool
// aim: 未完成

func NewKPool(option ...Option) *KPool {
	k := &KPool{
		SubData: make(chan []byte, 1),
		Data:     make([][]byte, 0),
		HaveSub:  false,
		MaxCache: 0,
	}

	for _, o := range option {
		o(k)
	}

	return k
}

func (c *KPool)Subscribe(fn Sub)  {
	go func() {
		for {
			select {
			case data := <- c.SubData:
				// 可以在这里面进行进一步处理
				fn(c, data)
			default:
			}
		}
	}()
}

func (c *KPool)SetData(data []byte) error  {
	c.mux.Lock()
	defer c.mux.Unlock()
	
	c.Data = append(c.Data, data)
	
	if c.HaveSub {
		c.SubData <- data
		return nil
	}
	
	return nil
}

func (c *KPool) GetData() ([]byte, error) {
	c.mux.Lock()
	defer c.mux.Unlock()

	if len(c.Data) == 0 {
		return nil, nil
	}

	data := c.Data[0]
	c.Data = c.Data[1:]

	return data, nil
}